在 Spring 云流中的单个处理器中执行记录聚合并启动 spring 云任务
Performing Aggregation of records and launch spring cloud task in single Processor in Spring cloud stream
我正在尝试执行以下操作
- 聚合消息
- 正在启动 Spring 云任务
但无法将聚合消息传递给启动任务的方法。下面是一段代码
@Autowired
private TaskProcessorProperties processorProperties;
@Autowired
Processor processor;
@Autowired
private AppConfiguration appConfiguration ;
@Transformer(inputChannel = MyProcessor.intermidiate, outputChannel = Processor.OUTPUT)
public Object setupRequest(String message) {
Map<String, String> properties = new HashMap<>();
if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) {
properties.put("spring_datasource_url", this.processorProperties.getDataSourceUrl());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceDriverClassName())) {
properties.put("spring_datasource_driverClassName", this.processorProperties
.getDataSourceDriverClassName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) {
properties.put("spring_datasource_username", this.processorProperties
.getDataSourceUserName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) {
properties.put("spring_datasource_password", this.processorProperties
.getDataSourcePassword());
}
properties.put("payload", message);
TaskLaunchRequest request = new TaskLaunchRequest(
this.processorProperties.getUri(), null, properties, null,
this.processorProperties.getApplicationName());
System.out.println("inside task launcher **************************");
System.out.println(request.toString() +"**************************");
return new GenericMessage<>(request);
}
@ServiceActivator(inputChannel = Processor.INPUT,outputChannel = MyProcessor.intermidiate)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
//aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
为了在聚合器和任务启动器方法 (setupRequest(String message)) 之间传递消息,我使用的通道 MyProcessor.intermidiate 定义如下
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Indexed;
public interface MyProcessor {
String intermidiate = "intermidiate";
@Output("intermidiate")
MessageChannel intermidiate();
}
Applicaion.properties使用如下
aggregator.message-store-type=persistentMessageStore
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
它不起作用,用上面提到的方法。
在这个 class 中,如果我将频道名称从我定义的频道 MyProcessor.intermediate 更改为 Processor.input 或 Processor.output,那么任何一个都有效(基于通道名称更改为处理器。*)
我想先聚合消息,然后想在处理器中对聚合的消息启动任务,但这并没有发生
看这里:
public Object setupRequest(String message) {
因此,您期望一些 string
作为请求负载。
您的 AggregatorFactoryBean
使用 DefaultAggregatingMessageGroupProcessor
,它的作用恰恰是:
List<Object> payloads = new ArrayList<Object>(messages.size());
for (Message<?> message : messages) {
payloads.add(message.getPayload());
}
return payloads;
所以,绝对不是String
。
奇怪的是你没有显示你的配置发生了什么异常,但我假设你需要更改 setupRequest()
签名以期望 List
有效负载或者你需要提供一些自定义 MessageGroupProcessor
从您聚合的消息组中构建 String
。
我正在尝试执行以下操作
- 聚合消息
- 正在启动 Spring 云任务 但无法将聚合消息传递给启动任务的方法。下面是一段代码
@Autowired
private TaskProcessorProperties processorProperties;
@Autowired
Processor processor;
@Autowired
private AppConfiguration appConfiguration ;
@Transformer(inputChannel = MyProcessor.intermidiate, outputChannel = Processor.OUTPUT)
public Object setupRequest(String message) {
Map<String, String> properties = new HashMap<>();
if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) {
properties.put("spring_datasource_url", this.processorProperties.getDataSourceUrl());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceDriverClassName())) {
properties.put("spring_datasource_driverClassName", this.processorProperties
.getDataSourceDriverClassName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) {
properties.put("spring_datasource_username", this.processorProperties
.getDataSourceUserName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) {
properties.put("spring_datasource_password", this.processorProperties
.getDataSourcePassword());
}
properties.put("payload", message);
TaskLaunchRequest request = new TaskLaunchRequest(
this.processorProperties.getUri(), null, properties, null,
this.processorProperties.getApplicationName());
System.out.println("inside task launcher **************************");
System.out.println(request.toString() +"**************************");
return new GenericMessage<>(request);
}
@ServiceActivator(inputChannel = Processor.INPUT,outputChannel = MyProcessor.intermidiate)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
//aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
为了在聚合器和任务启动器方法 (setupRequest(String message)) 之间传递消息,我使用的通道 MyProcessor.intermidiate 定义如下
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Indexed;
public interface MyProcessor {
String intermidiate = "intermidiate";
@Output("intermidiate")
MessageChannel intermidiate();
}
Applicaion.properties使用如下
aggregator.message-store-type=persistentMessageStore
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
它不起作用,用上面提到的方法。
在这个 class 中,如果我将频道名称从我定义的频道 MyProcessor.intermediate 更改为 Processor.input 或 Processor.output,那么任何一个都有效(基于通道名称更改为处理器。*)
我想先聚合消息,然后想在处理器中对聚合的消息启动任务,但这并没有发生
看这里:
public Object setupRequest(String message) {
因此,您期望一些 string
作为请求负载。
您的 AggregatorFactoryBean
使用 DefaultAggregatingMessageGroupProcessor
,它的作用恰恰是:
List<Object> payloads = new ArrayList<Object>(messages.size());
for (Message<?> message : messages) {
payloads.add(message.getPayload());
}
return payloads;
所以,绝对不是String
。
奇怪的是你没有显示你的配置发生了什么异常,但我假设你需要更改 setupRequest()
签名以期望 List
有效负载或者你需要提供一些自定义 MessageGroupProcessor
从您聚合的消息组中构建 String
。