spring 云数据流中的 spring webflux 流问题
issue with spring webflux stream in spring cloud data flow
我有自定义 spring 云数据流源应用程序,它连接到休息端点并缓冲数据并发送到自定义接收器。我正在使用 Spring webflux。如果我执行 block() 并收集所有可用数据并将其发送到消息通道,一切正常。问题是我需要以块的形式发送数据,因此缓冲流中的数据,比如 100K 记录的块并发送到消息通道。现在,当我尝试做缓冲区并将块发送到消息通道时,它失败了。
Working Code (Blocking call to collect all data and send at once)
@InboundChannelAdapter(
value = Source.OUTPUT
poller = @Poller(fixedDelay = "120000", maxMessagesPerPoll = "100000")
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))
.collectList()
.block();
return ()->MessageBuilder.withPayload(list).build();
}
Now i want to buffer the same and send it instead of doing a collect
and blocking call but this is failing
SOURCE CODE
@Scheduled(fixedDelay = 120000)
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())
.buffer(100000)
.concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
}
@InboundChannelAdapter(
value = Source.OUTPUT
)
public MessageSource<List<MyDataDTO>> sendToChannel(List<MyDataDTO> list )
{
return ()->MessageBuilder.withPayload(list).build();
}
STACK TRACE
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 2019-09-16 06:27:47.859 错误 9 --- [调度-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: 调用方法失败;嵌套异常是 java.lang.IllegalArgumentException:参数数量错误
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:117)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.lang.Thread.run(Thread.java:748)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 原因:java.lang.IllegalArgumentException:参数数量错误
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.lang.reflect.Method.invoke(Method.java:498)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:230)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:114)
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] ... 还有 18 个
2019-09-16T02:27:48.533-04:00 [CELL/0] [OUT] 容器变得健康
我用下面的代码得到了这个
@Scheduled(fixedDelay = 120000)
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())
.buffer(100000)
.concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
}
public void sendToChannel(List<MyDataDTO> list )
{
this.source.output().send(MessageBuilder.withPayload(list).build());
}
我有自定义 spring 云数据流源应用程序,它连接到休息端点并缓冲数据并发送到自定义接收器。我正在使用 Spring webflux。如果我执行 block() 并收集所有可用数据并将其发送到消息通道,一切正常。问题是我需要以块的形式发送数据,因此缓冲流中的数据,比如 100K 记录的块并发送到消息通道。现在,当我尝试做缓冲区并将块发送到消息通道时,它失败了。
Working Code (Blocking call to collect all data and send at once)
@InboundChannelAdapter(
value = Source.OUTPUT
poller = @Poller(fixedDelay = "120000", maxMessagesPerPoll = "100000")
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))
.collectList()
.block();
return ()->MessageBuilder.withPayload(list).build();
}
Now i want to buffer the same and send it instead of doing a collect and blocking call but this is failing
SOURCE CODE
@Scheduled(fixedDelay = 120000)
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())
.buffer(100000)
.concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
}
@InboundChannelAdapter(
value = Source.OUTPUT
)
public MessageSource<List<MyDataDTO>> sendToChannel(List<MyDataDTO> list )
{
return ()->MessageBuilder.withPayload(list).build();
}
STACK TRACE
2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 2019-09-16 06:27:47.859 错误 9 --- [调度-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: 调用方法失败;嵌套异常是 java.lang.IllegalArgumentException:参数数量错误 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:117) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.lang.Thread.run(Thread.java:748) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 原因:java.lang.IllegalArgumentException:参数数量错误 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 java.lang.reflect.Method.invoke(Method.java:498) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:230) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 在 org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:114) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] ... 还有 18 个 2019-09-16T02:27:48.533-04:00 [CELL/0] [OUT] 容器变得健康
我用下面的代码得到了这个
@Scheduled(fixedDelay = 120000)
public MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
.get()
.uri(builder -> builder.scheme("https")
.host("server_name")
.path("/api/endpoint")
.build())
.header("Cookie",cookie)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())
.buffer(100000)
.concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
}
public void sendToChannel(List<MyDataDTO> list )
{
this.source.output().send(MessageBuilder.withPayload(list).build());
}