Spring WebFlux - 如何从数据库中获取数据以在下一步中使用
Spring WebFlux - how to get data from DB to use in the next step
我使用 Spring WebFlux (Project Reactor),我面临以下问题:
我必须从数据库中获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容。怎么做?
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
return obj
.flatMap(
ob->
Mono.zip(
repo1.save(
...),
repo2
.saveAll(...)
.collectList(),
repo3
.saveAll(...)
.collectList())
.map(this::createSpecificObject))
.doOnNext(item-> createObjAndCallAnotherService(item));
}
private void createObjAndCallAnotherService(Prot prot){
myRepository
.findById(
prot.getDomCred().stream()
.filter(Objects::nonNull)
.findFirst()
.map(ConfDomCred::getCredId)
.orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
.doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
confCred-> {//from this point the code is unreachable!!! - why????
Optional<ConfDomCred> confDomCred=
prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();
confDomCred.ifPresent(
domCred -> {
ProtComDto com=
ProtComDto.builder()
.userName(confCred.getUsername())
.password(confCred.getPassword())
.build();
clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
});
});
}
更新
当我调用
Flux<MyObj> myFlux = myRepository
.findById(
prot.getDomCred().stream()
.filter(Objects::nonNull)
.findFirst()
.map(ConfDomCred::getCredId)
.orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));
myFlux.subscribe(e -> e.getPassword())
然后打印值
更新2
回顾一下 - 我认为下面的代码是 asynchronous/non-blocking - 我说得对吗?
在我的
ProtectionCommandService
我不得不使用 subscribe() 两次 - 只有这样我才能调用我的其他服务并将它们存储在我的对象中:commandControllerApi.createNewCommand
public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
return newProtection.flatMap(
protection ->
Mono.zip(
protectorRepository.save(//some code),
domainCredentialRepository
.saveAll(//some code)
.collectList(),
protectionSetRepository
.saveAll(//some code)
.collectList())
.map(this::createNewObjectWrapper)
.doOnNext(protectionCommandService::createProtectionCommand));
}
ProtectionCommandServiceclass:
public class ProtectionCommandService {
private final ProtectionCommandStrategyFactory protectionCommandFactory;
private final CommandControllerApi commandControllerApi;
public Mono<ProtectionObjectsWrapper> createProtectionCommand(
ProtectionObjectsWrapper protection) {
ProductType productType = protection.getProtector().getProductType();
Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);
commandFactory
.get()
.createCommandFromProtection(protection)
.subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
return Mono.just(protection);
}
}
以及 2 个工厂之一:
@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {
private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
ImmutableMap.of(...//some values);
private final ConfigurationCredentialRepository configurationCredentialRepository;
@Override
public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
Optional<DomainCredential> domainCredential =
protection.getDomainCredentials().stream().findFirst();
return configurationCredentialRepository
.findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
.map(credential -> createCommand(protection, credential, domainCredential.get()));
}
和 createCommand 方法 returns Mono 对象作为该工厂的结果。
private Mono<CommandDetails> createCommand(Protection protection
//other parameters) {
CommandDto commandDto =
buildCommandDto(protection, confCredential, domainCredentials);
String commands = JsonUtils.toJson(commandDto);
CommandDetails details = new CommandDetails();
details.setAgentId(protection.getProtector().getAgentId().toString());
details.setCommandType(///some value);
details.setArguments(//some value);
return Mono.just(details);
更新3
我调用所有内容的主要方法已经做了一些更改:
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
return obj
.flatMap(
ob->
Mono.zip(
repo1.save(
...),
repo2
.saveAll(...)
.collectList(),
repo3
.saveAll(...)
.collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));
停止断链
这是一个 纯函数 它 return 是什么东西,而且无论我们给它什么,它总是 return 是相同的东西。没有副作用。
public Mono<Integer> fooBar(int number) {
return Mono.just(number);
}
我们可以调用它并继续下去,因为它 return 有点东西。
foobar(5).flatMap(number -> { ... }).subscribe();
这是一个非纯函数,我们不能链上,我们正在断链。我们无法订阅,在我们订阅之前什么都不会发生。
public void fooBar(int number) {
Mono.just(number)
}
fooBar(5).subscribe(); // compiler error
但是我想要一个void函数,我想要,我想要我想要.... wuuaaa wuaaaa
我们总是需要 returned 一些东西,以便我们可以触发链中的下一个部分。不然程序怎么知道什么时候 运行 下一节呢?但是假设我们想忽略 return 值并只触发下一部分。那么我们可以 return a Mono<Void>
.
public Mono<Void> fooBar(int number) {
System.out.println("Number: " + number);
return Mono.empty();
}
foobar(5).subscribe(); // Will work we have not broken the chain
你的例子:
private void createObjAndCallAnotherService(Prot prot){
myRepository.findById( ... ) // breaking the chain, no return
}
以及其他一些提示:
- 正确命名您的对象,而不是
MyObj
和 saveObj
、myRepository
- 避免使用长名称
createObjAndCallAnotherService
- 遵循单一职责
createObjAndCallAnotherService
这是做两件事,因此得名。
- 创建私有函数或辅助函数以使您的代码更具可读性,但不要内联所有内容。
更新
你还在犯同样的错误。
commandFactory // Here you are breaking the chain because you are ignoring the return type
.get()
.createCommandFromProtection(protection)
.subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);
您想做的是:
return commandFactory.get()
.createCommandFrom(protection)
.flatMap(command -> commandControllerApi.createNewCommand(command))
.thenReturn(protection);
停止打破链条,不要订阅,除非你的服务是最终消费者,或者发起调用的人。
我使用 Spring WebFlux (Project Reactor),我面临以下问题: 我必须从数据库中获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容。怎么做?
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
return obj
.flatMap(
ob->
Mono.zip(
repo1.save(
...),
repo2
.saveAll(...)
.collectList(),
repo3
.saveAll(...)
.collectList())
.map(this::createSpecificObject))
.doOnNext(item-> createObjAndCallAnotherService(item));
}
private void createObjAndCallAnotherService(Prot prot){
myRepository
.findById(
prot.getDomCred().stream()
.filter(Objects::nonNull)
.findFirst()
.map(ConfDomCred::getCredId)
.orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
.doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
confCred-> {//from this point the code is unreachable!!! - why????
Optional<ConfDomCred> confDomCred=
prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();
confDomCred.ifPresent(
domCred -> {
ProtComDto com=
ProtComDto.builder()
.userName(confCred.getUsername())
.password(confCred.getPassword())
.build();
clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
});
});
}
更新
当我调用
Flux<MyObj> myFlux = myRepository
.findById(
prot.getDomCred().stream()
.filter(Objects::nonNull)
.findFirst()
.map(ConfDomCred::getCredId)
.orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));
myFlux.subscribe(e -> e.getPassword())
然后打印值
更新2
回顾一下 - 我认为下面的代码是 asynchronous/non-blocking - 我说得对吗? 在我的
ProtectionCommandService
我不得不使用 subscribe() 两次 - 只有这样我才能调用我的其他服务并将它们存储在我的对象中:commandControllerApi.createNewCommand
public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
return newProtection.flatMap(
protection ->
Mono.zip(
protectorRepository.save(//some code),
domainCredentialRepository
.saveAll(//some code)
.collectList(),
protectionSetRepository
.saveAll(//some code)
.collectList())
.map(this::createNewObjectWrapper)
.doOnNext(protectionCommandService::createProtectionCommand));
}
ProtectionCommandServiceclass:
public class ProtectionCommandService {
private final ProtectionCommandStrategyFactory protectionCommandFactory;
private final CommandControllerApi commandControllerApi;
public Mono<ProtectionObjectsWrapper> createProtectionCommand(
ProtectionObjectsWrapper protection) {
ProductType productType = protection.getProtector().getProductType();
Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);
commandFactory
.get()
.createCommandFromProtection(protection)
.subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
return Mono.just(protection);
}
}
以及 2 个工厂之一:
@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {
private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
ImmutableMap.of(...//some values);
private final ConfigurationCredentialRepository configurationCredentialRepository;
@Override
public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
Optional<DomainCredential> domainCredential =
protection.getDomainCredentials().stream().findFirst();
return configurationCredentialRepository
.findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
.map(credential -> createCommand(protection, credential, domainCredential.get()));
}
和 createCommand 方法 returns Mono 对象作为该工厂的结果。
private Mono<CommandDetails> createCommand(Protection protection
//other parameters) {
CommandDto commandDto =
buildCommandDto(protection, confCredential, domainCredentials);
String commands = JsonUtils.toJson(commandDto);
CommandDetails details = new CommandDetails();
details.setAgentId(protection.getProtector().getAgentId().toString());
details.setCommandType(///some value);
details.setArguments(//some value);
return Mono.just(details);
更新3
我调用所有内容的主要方法已经做了一些更改:
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
return obj
.flatMap(
ob->
Mono.zip(
repo1.save(
...),
repo2
.saveAll(...)
.collectList(),
repo3
.saveAll(...)
.collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));
停止断链
这是一个 纯函数 它 return 是什么东西,而且无论我们给它什么,它总是 return 是相同的东西。没有副作用。
public Mono<Integer> fooBar(int number) {
return Mono.just(number);
}
我们可以调用它并继续下去,因为它 return 有点东西。
foobar(5).flatMap(number -> { ... }).subscribe();
这是一个非纯函数,我们不能链上,我们正在断链。我们无法订阅,在我们订阅之前什么都不会发生。
public void fooBar(int number) {
Mono.just(number)
}
fooBar(5).subscribe(); // compiler error
但是我想要一个void函数,我想要,我想要我想要.... wuuaaa wuaaaa
我们总是需要 returned 一些东西,以便我们可以触发链中的下一个部分。不然程序怎么知道什么时候 运行 下一节呢?但是假设我们想忽略 return 值并只触发下一部分。那么我们可以 return a Mono<Void>
.
public Mono<Void> fooBar(int number) {
System.out.println("Number: " + number);
return Mono.empty();
}
foobar(5).subscribe(); // Will work we have not broken the chain
你的例子:
private void createObjAndCallAnotherService(Prot prot){
myRepository.findById( ... ) // breaking the chain, no return
}
以及其他一些提示:
- 正确命名您的对象,而不是
MyObj
和saveObj
、myRepository
- 避免使用长名称
createObjAndCallAnotherService
- 遵循单一职责
createObjAndCallAnotherService
这是做两件事,因此得名。 - 创建私有函数或辅助函数以使您的代码更具可读性,但不要内联所有内容。
更新
你还在犯同样的错误。
commandFactory // Here you are breaking the chain because you are ignoring the return type
.get()
.createCommandFromProtection(protection)
.subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);
您想做的是:
return commandFactory.get()
.createCommandFrom(protection)
.flatMap(command -> commandControllerApi.createNewCommand(command))
.thenReturn(protection);
停止打破链条,不要订阅,除非你的服务是最终消费者,或者发起调用的人。