如何使用 wireTap 传递 header?
How to pass header using wireTap?
现在我有以下流程:
flow -> flow.channel(some_channel())
.....
.gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
这里的问题在于:
anotherFlow
以 MessageHandler
完成,即 void
所以 anotherFlow 没有 return 任何东西。
我尝试使用以下方法:
flow -> flow.channel(some_channel())
.....
.wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
但编译器因 idempotentByHeader
return 类型而抱怨,所以我尝试执行以下操作:
default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
但是 WireTapSpec 没有建议方法。
如何解决?
P.S.
我能够通过改变 return 类型的 idempotentByHeader
来编写
.wireTap(anotherFlow)
.enrich(idempotentByHeader(OBJECT_ID_HEADER));
但现在应用程序无法启动,因为:
Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
... 42 common frames omitted
好的。您忽略了 WireTap
是 频道拦截器 这一事实。它不是一个 endpoint 像能够接受幂等接收器拦截器的网关。
我不确定您 idempotentByHeaderInterceptor
的目标是什么,但 headers 确实在将要发送到 WireTap 的消息中传递。因此,您可以访问订阅此 WireTap 的 sub-flow 中的 headers。
你最近的 enrich()
样本也让我有点困惑。在使用网关之前,您试图避免通过 idempotentByHeaderInterceptor
向 sub-flow 发送相同的消息,但现在您无条件地向 wireTap
发送,并且仅在此之后您才申请 idempotentByHeaderInterceptor
.
那么,您 idempotentByHeaderInterceptor
的目标是什么?您想将其应用到什么地方?
现在我有以下流程:
flow -> flow.channel(some_channel())
.....
.gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
这里的问题在于:
anotherFlow
以 MessageHandler
完成,即 void
所以 anotherFlow 没有 return 任何东西。
我尝试使用以下方法:
flow -> flow.channel(some_channel())
.....
.wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
但编译器因 idempotentByHeader
return 类型而抱怨,所以我尝试执行以下操作:
default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
但是 WireTapSpec 没有建议方法。
如何解决?
P.S.
我能够通过改变 return 类型的 idempotentByHeader
来编写 .wireTap(anotherFlow)
.enrich(idempotentByHeader(OBJECT_ID_HEADER));
但现在应用程序无法启动,因为:
Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
... 42 common frames omitted
好的。您忽略了 WireTap
是 频道拦截器 这一事实。它不是一个 endpoint 像能够接受幂等接收器拦截器的网关。
我不确定您 idempotentByHeaderInterceptor
的目标是什么,但 headers 确实在将要发送到 WireTap 的消息中传递。因此,您可以访问订阅此 WireTap 的 sub-flow 中的 headers。
你最近的 enrich()
样本也让我有点困惑。在使用网关之前,您试图避免通过 idempotentByHeaderInterceptor
向 sub-flow 发送相同的消息,但现在您无条件地向 wireTap
发送,并且仅在此之后您才申请 idempotentByHeaderInterceptor
.
那么,您 idempotentByHeaderInterceptor
的目标是什么?您想将其应用到什么地方?