Spring 使用 "wrap" 函数创建时出现 RSocketRequester 问题
Spring RSocketRequester issue when created with "wrap" function
当我像这样创建 RSocketRequester
时它工作正常:
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
但是当我尝试发送创建的消息时抛出异常:
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MediaType.APPLICATION_CBOR, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rSocketStrategies);
}
io.rsocket.exceptions.ApplicationErrorException: No handler for destination ''
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler com.issoft.rnd.ms2.controller.TestController#test4() [DispatcherHandler]
|_ checkpoint ⇢ HTTP GET "/test4" [ExceptionHandlingWebHandler]
Stack trace:
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556) ~[rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8134) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:213) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:346) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1050) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
发送功能:
public Mono<String> request(String route, String data, Class<String> clazz) {
return rSocketRequester.route(route).data(data).retrieveMono(clazz);
}
服务器:
@Controller
@AllArgsConstructor
public class RTestController {
private EchoService echoService;
@MessageMapping("test")
Mono<String> test(String value) {
return echoService.echo(value);
}
}
Spring 引导版本:
2.2.1.RELEASE
注:
Mime 类型取自第一个工作版本。
wrap
函数有什么问题?
TL;DR:这不是 Spring 的问题,而是您创建客户端 RSocket 连接的方式的问题。
RSocketRequester.wrap
方法一般用在服务器端,一旦客户端发起连接,服务器要向对方发送请求。 Javadoc 说您可以在客户端使用它,这是一个完全有效的用例。
现在 RSocketRequester.wrap
在其参数中接受有关预期数据 mime 类型和元数据 mime 类型的数据。这通常是双方在建立连接时使用 RSocket SETUP/METADATA_PUSH 帧协商的。一旦建立连接,就完成了。 wrap
方法需要这些参数,因为我们无法从现有的 RSocket 中轻松提取该信息。
在您的情况下,已使用 RSocketFactory
创建连接但未提供该信息(ClientRSocketFactory
class 为此提供了许多方法)。所以我认为元数据无法在请求中正确编码,导致服务器无法读取路由信息。
除非您有非常具体的要求,否则我建议使用 RSocketRequester.Builder
API 创建客户端 RSocket,因为它更容易正确处理。
添加 dataMimeType
和 metadataMimeType
有帮助。
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.dataMimeType(MediaType.APPLICATION_CBOR_VALUE)
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
使用 wrap 方法的最初原因是使用 LoadBalancedRSocketMono,它提供客户端负载平衡和重新连接 OOTB。
LoadBalancedRSocketMono.create(Flux.just(Collections.singleton(new RSocketSupplier(() ->
RSocketFactory
.connect()
.dataMimeType(MediaType.APPLICATION_CBOR_VALUE)
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.doOnNext(rSocket -> {
this.rSocketRequester = RSocketRequester.wrap(rSocket, MediaType.APPLICATION_CBOR, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rSocketStrategies);
})
)))
).subscribe();
当我像这样创建 RSocketRequester
时它工作正常:
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
但是当我尝试发送创建的消息时抛出异常:
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MediaType.APPLICATION_CBOR, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rSocketStrategies);
}
io.rsocket.exceptions.ApplicationErrorException: No handler for destination '' at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.issoft.rnd.ms2.controller.TestController#test4() [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/test4" [ExceptionHandlingWebHandler] Stack trace: at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na] at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556) ~[rsocket-core-1.0.0-RC5.jar:na] at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516) ~[rsocket-core-1.0.0-RC5.jar:na] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:8134) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:213) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:346) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1050) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
发送功能:
public Mono<String> request(String route, String data, Class<String> clazz) {
return rSocketRequester.route(route).data(data).retrieveMono(clazz);
}
服务器:
@Controller
@AllArgsConstructor
public class RTestController {
private EchoService echoService;
@MessageMapping("test")
Mono<String> test(String value) {
return echoService.echo(value);
}
}
Spring 引导版本: 2.2.1.RELEASE
注: Mime 类型取自第一个工作版本。
wrap
函数有什么问题?
TL;DR:这不是 Spring 的问题,而是您创建客户端 RSocket 连接的方式的问题。
RSocketRequester.wrap
方法一般用在服务器端,一旦客户端发起连接,服务器要向对方发送请求。 Javadoc 说您可以在客户端使用它,这是一个完全有效的用例。
现在 RSocketRequester.wrap
在其参数中接受有关预期数据 mime 类型和元数据 mime 类型的数据。这通常是双方在建立连接时使用 RSocket SETUP/METADATA_PUSH 帧协商的。一旦建立连接,就完成了。 wrap
方法需要这些参数,因为我们无法从现有的 RSocket 中轻松提取该信息。
在您的情况下,已使用 RSocketFactory
创建连接但未提供该信息(ClientRSocketFactory
class 为此提供了许多方法)。所以我认为元数据无法在请求中正确编码,导致服务器无法读取路由信息。
除非您有非常具体的要求,否则我建议使用 RSocketRequester.Builder
API 创建客户端 RSocket,因为它更容易正确处理。
添加 dataMimeType
和 metadataMimeType
有帮助。
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.dataMimeType(MediaType.APPLICATION_CBOR_VALUE)
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
使用 wrap 方法的最初原因是使用 LoadBalancedRSocketMono,它提供客户端负载平衡和重新连接 OOTB。
LoadBalancedRSocketMono.create(Flux.just(Collections.singleton(new RSocketSupplier(() ->
RSocketFactory
.connect()
.dataMimeType(MediaType.APPLICATION_CBOR_VALUE)
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.doOnNext(rSocket -> {
this.rSocketRequester = RSocketRequester.wrap(rSocket, MediaType.APPLICATION_CBOR, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rSocketStrategies);
})
)))
).subscribe();