当 IdempotentReceiverInterceptor 丢弃消息时流挂起(在第 4 条消息之后)
Flow hangs when IdempotentReceiverInterceptor discards the message(after 4-th message)
我有以下流程:
return flow -> flow.channel(inputChannel())
...
.gateway(childFlow, addMyInterceptor(str)); // by name
}
Consumer<GatewayEndpointSpec> addMyInterceptor(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(addMyInterceptorInternal(objectIdHeader))
.errorChannel(errorChannel());
}
default IdempotentReceiverInterceptor addMyInterceptorInternal(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
当 IdempotentReceiverInterceptor
遇到消息重复时 - 我看到应用程序在第 4 条重复消息后挂起。我知道这是因为网关预期响应(如此处:)但我不知道如何 return 来自拦截器的结果。
你能帮我解释一下吗?
只要所有通道都是直接的(默认)- 即在使用队列或执行器通道的流中没有异步切换,当流可能不 return 回复
我有以下流程:
return flow -> flow.channel(inputChannel())
...
.gateway(childFlow, addMyInterceptor(str)); // by name
}
Consumer<GatewayEndpointSpec> addMyInterceptor(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(addMyInterceptorInternal(objectIdHeader))
.errorChannel(errorChannel());
}
default IdempotentReceiverInterceptor addMyInterceptorInternal(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
当 IdempotentReceiverInterceptor
遇到消息重复时 - 我看到应用程序在第 4 条重复消息后挂起。我知道这是因为网关预期响应(如此处:
你能帮我解释一下吗?
只要所有通道都是直接的(默认)- 即在使用队列或执行器通道的流中没有异步切换,当流可能不 return 回复