RSocket 适用于生成的数据,但不适用于 Spring Reactive MongoDB

RSocket works with generated data but not with Spring Reactive MongoDB

解决总结:

在当前的大多数 RSocket 示例中,即使在 SpringBoot 相关教程中,服务器端接受器也被简单地构造为一个新对象(如下面的 new MqttMessageService() )。如果您直接在接受器中生成示例内容,这很好 class 但当接受器依赖于容器中的其他 bean 时,可能会导致以下依赖注入相关的混淆。

原题:

尝试通过 Rsocket 的 Java 服务器使用 Spring Data Reactive Mongodb 存储库流式传输数据库条目时,我收到 NullPointerException。

问题是在调试过程中所有组件单独工作:我可以通过同一个 Mongodb 存储库获取请求的数据,我也可以在之间传输随机生成的数据使用 Rsocket 的同一服务器和客户端。

所以我要么遗漏了一些非常基本的东西,要么一起使用 Reactive Mongodb 和 Rsocket 可能会出现问题。

这是原始的服务器端 Rsocket 配置

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

这里是工作服务器端 Rsocket 配置 与适当的 DI:

@Configuration
public class RsocketConfig {

    @Autowired
    MqttMessageService messageService;

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(messageService))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

这是 服务器端 AbstractRSocket 实现,其中在 return service.findAll() 抛出 NullPointerException。

@Service
public class MqttMessageService extends AbstractRSocket {



    @Autowired 
    private MqttMessageEntityService service;

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return service.findAll()
            .map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

    }
}

这是反应性存储库和相关服务。服务 return 在注入服务器的 AbstractRSocket 实现时为 null,但在注入其他 classes 时工作正常:

@Service
public class MqttMessageEntityService {

    @Autowired
    private MqttMessageEntityRepository repository;

    public Flux<MqttMessageEntity> findAll() {
        return repository.findAll();
    }

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

这是与测试内容完美配合的客户端代码:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void testRsocket() {

        RSocket rSocketClient = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(8802))
                .start()
                .block();

        rSocketClient
                .requestStream(DefaultPayload.create(""))
                .blockLast();
    }        
}

我在这里的知识水平可能有点高,而且关于这个主题的资源非常有限,所以我很感激任何关于解决方案的提示:)

关于

@PostConstruct
public void startServer() {
    RSocketFactory.receive()
            .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
            .transport(TcpServerTransport.create(8802))
            .start()
            .block()
            .onClose();
}

您是否使用 来保持服务器活动?如果是这样,在 onClose() 之后添加另一个块。

messageEntityService是否为空?因为如果变量 topicStart 和 module 不是,那看起来是唯一可能导致错误的东西。特别是如果其他代码有效 - 我真的看不到任何会导致 RSocket 端出现问题的东西。