如何将 if-else 语句包含到反应流中
How to include if-else statements into reactive flow
我有一个 Spring Webflux 响应式服务,它接收一个 DTO 并将其插入多个 table。
有时我们可能需要根据传入的 DTO 跳过插入某些 tables。
这些是要求:
- 创建新客户
- 如果 DTO 中存在客户推荐,则创建新的客户推荐。
- 如果 DTO 中存在,则创建客户端次要联系人
- 如果 DTO 中存在,则创建客户端电话
问题:-
- 不确定如何在反应流中应用 if 条件。
- 有更好的方法吗?
- 这里,除了第一个,所有其他操作都可以运行并行。
public Mono<ServerResponse> createClientProfile(ServerRequest request) {
return secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
return toNewClientReferral(clientRes.getClientId(), client.getDiscount(), usr)
.flatMap(clientReferralRepository::save).flatMap(clientReferralRes -> {
return toNewClientSyContact(clientRes.getClientId(), client.getSecondary(), usr)
.flatMap(clientSyContactRepository::save).flatMap(clientSyContactRes -> {
return clientPhoneRepository
.saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr))
.collectList().flatMap(phoneRes -> {
return ServerResponse
.created(URI.create(String.format(CLIENT_URI_FORMAT,
clientRes.getClientId())))
.contentType(APPLICATION_JSON).build();
});
});
});
});
});
});
}
private Mono<Referral> toNewClientReferral(final long clientId, final Discount dto) {
Referral referral = Referral.of(clientId,
dto.getName(), dto.getType(), dto.getAmount(), dto.getStatus());
return Mono.just(referral);
}
client.getDiscount() can be null,
client.getSecondary() can be null,
client.getPhones() can be empty.
我用 3 种不同的方法分离了流程。
public void createSyContact(ServerRequest request, long clientId) {
secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
if (client.getSecondary() != null) {
return toNewClientSyContact(clientId, client.getSecondary(), usr)
.flatMap(clientSyContactRepository::save).flatMap(clientRes -> {
return Mono.just(clientRes.getClientId());
});
} else {
return Mono.empty();
}
});
});
}
public void createReferral(ServerRequest request, long clientId) {
secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
if (client.getDiscount() != null) {
return toNewClientReferral(clientId, client.getDiscount(), usr)
.flatMap(clientReferralRepository::save).flatMap(clientRes -> {
return Mono.just(clientRes.getClientId());
});
} else {
return Mono.empty();
}
});
});
}
public Mono<Long> createClientWithPhones(ServerRequest request) {
return secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
return clientPhoneRepository
.saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr)).collectList()
.flatMap(phoneRes -> {
return Mono.just(clientRes.getClientId());
});
});
});
});
}
在这里,createClientWithPhones 是强制性的,所以没有 if 检查那里。但是其他 2 个方法 createReferral 和 createSyContact 有 if 检查。需要先执行createClientWithPhones,它会return clientId。此 clientId 应在 createReferral 和 createSyContact 中使用。
public Mono<ServerResponse> createClientProfile(ServerRequest request) {
final List<Long> clinetIdList = new ArrayList<>();
createClientWithPhones(request).subscribe(result -> {
clinetIdList.add(result.longValue());
createSyContact(request, result.longValue());
createReferral(request, result.longValue());
});
return ServerResponse
.created(URI.create(String.format(CLIENT_URI_FORMAT,
clinetIdList.get(0))))
.contentType(APPLICATION_JSON).build();
}
是这样处理的吗?
例如,可以在 flatMap
中完成一个简单的 if 语句,然后执行。
public Mono<String> foobar() {
return Mono.just("foo").flatMap(value -> {
if(value != null)
return Mono.just("Has value");
else
return Mono.empty();
}
}
foobar()
.switchIfEmpty(Mono.just("Is empty"))
.subscribe(output -> System.out.println(output);
好吧,我认为人们对响应式库没有很好的理解。我的意思是,通常人们接近 Java 8 流,因为他们正在尝试进行函数式编程。当然反应式库是基于函数式编程的,但我认为目的是围绕阻塞进行异步I/O。考虑 WebFlux 项目的(当前)首页。
What is reactive processing?
Reactive processing is a paradigm that enables developers build non-blocking, asynchronous applications that can handle back-pressure (flow control).
所以,这是一种冗长的说法,我认为最好关注 I/O 发生的地方,而不是创建功能代码。如果你需要 if
语句,那么你需要 if
语句。与其尝试弄清楚如何使用函数式编程执行 if
语句,不如尝试找出 I/O 发生的位置并以异步方式处理它。我喜欢使用的一个“技巧”是 Mono::zip
或 Flux::zip
。这些函数将许多 I/O 调用合并到一个发布者中,然后 return 发送给客户端。因此,请考虑此示例代码。
让我们创建一些响应式 r2dbc 函数:
Mono<Client> save(Client client) {
client.id = 1L;
System.out.println("Save client: " + client.id);
return Mono.just(client);
}
Mono<Phone> save(Phone phone) {
System.out.println("Save phone: " + phone.clientId);
return Mono.just(phone);
}
Mono<Referral> save(Referral referral) {
System.out.println("Save referral: " + referral.clientId);
return Mono.just(referral);
}
Mono<Contact> save(Contact contact) {
System.out.println("Save contact: " + contact.clientId);
return Mono.just(contact);
}
我们需要一些示例 类 来使用:
class DTO {
Client client;
List<Phone> phones;
Optional<Contact> contact;
Optional<Referral> referral;
}
class Client {
Long id;
}
class Contact {
Long clientId;
}
class Referral {
Long clientId;
}
class Phone {
Long clientId;
}
我们的输入可能是一个 Mono<DTO>
,因为这是 Request 应该提供的,所以我们的 Service
层需要从那个开始,return 一个 Mono<Long>
客户端 ID。
Mono<Long> doWork(Mono<DTO> monoDto) {
return monoDto.flatMap(dto->{
return save(dto.client).flatMap(client->{
List<Mono<?>> publishers = new ArrayList<>();
dto.phones.forEach(phone->{
phone.clientId = client.id;
publishers.add(save(phone));
});
if ( dto.contact.isPresent()) {
Contact c = dto.contact.get();
c.clientId = client.id;
publishers.add(save(c));
}
if ( dto.referral.isPresent()) {
Referral r = dto.referral.get();
r.clientId = client.id;
publishers.add(save(r));
}
if ( publishers.size() > 0 )
return Mono.zip(publishers, obs->client.id);
else
return Mono.just(client.id);
});
});
}
我 运行 使用以下示例代码:
@Override
public void run(ApplicationArguments args) throws Exception {
saveClient(new Client(), null, null, null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), null, null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), new Contact(), null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), new Contact(), new Referral()).subscribe(System.out::println);
}
private Mono<Long> saveClient(Client client, Phone phone, Contact contact,
Referral referral) {
// TODO Auto-generated method stub
DTO dto = new DTO();
dto.client = client;
dto.phones = new ArrayList<>();
if ( phone != null ) dto.phones.add(phone);
dto.contact = Optional.ofNullable(contact);
dto.referral = Optional.ofNullable(referral);
return doWork(Mono.just(dto));
}
因此,这使用了 Mono.zip
技巧。保存的客户端是平面映射的,因此首先完成。然后为需要完成的所有后续保存创建单声道列表。这些monos都是由Mono.zip函数异步执行的。 “combiner”函数对结果没有任何作用,它只是 returns clientId,这是客户端想要的。 Mono.zip 将所有 Monos 组合成一个单一的 Mono 以 return 发送给客户端。从某种意义上说,这只是采用过程代码并将其包装在响应式库中,而不是过分关注函数式编程。如果业务“流程”发生变化,这很容易阅读和修改。
如果你喜欢,这是一个起点。我没有使用 Repository::saveAll
,所以这可能是一个改进。
确保所有 Flux
和 Mono
发布商都链接在一起很重要。在你的最后一个例子中,你似乎放弃了它们。仅仅创建它们是不够的,它们都必须以某种方式 return 发送给客户端。此外,您的代码有一个 subscribe
调用,这是一个禁忌。只有客户应该订阅。我认为你应该在那里使用 map
。
编辑:修复了一个错误。仔细检查你的代码。
我有一个 Spring Webflux 响应式服务,它接收一个 DTO 并将其插入多个 table。 有时我们可能需要根据传入的 DTO 跳过插入某些 tables。
这些是要求:
- 创建新客户
- 如果 DTO 中存在客户推荐,则创建新的客户推荐。
- 如果 DTO 中存在,则创建客户端次要联系人
- 如果 DTO 中存在,则创建客户端电话
问题:-
- 不确定如何在反应流中应用 if 条件。
- 有更好的方法吗?
- 这里,除了第一个,所有其他操作都可以运行并行。
public Mono<ServerResponse> createClientProfile(ServerRequest request) {
return secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
return toNewClientReferral(clientRes.getClientId(), client.getDiscount(), usr)
.flatMap(clientReferralRepository::save).flatMap(clientReferralRes -> {
return toNewClientSyContact(clientRes.getClientId(), client.getSecondary(), usr)
.flatMap(clientSyContactRepository::save).flatMap(clientSyContactRes -> {
return clientPhoneRepository
.saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr))
.collectList().flatMap(phoneRes -> {
return ServerResponse
.created(URI.create(String.format(CLIENT_URI_FORMAT,
clientRes.getClientId())))
.contentType(APPLICATION_JSON).build();
});
});
});
});
});
});
}
private Mono<Referral> toNewClientReferral(final long clientId, final Discount dto) {
Referral referral = Referral.of(clientId,
dto.getName(), dto.getType(), dto.getAmount(), dto.getStatus());
return Mono.just(referral);
}
client.getDiscount() can be null,
client.getSecondary() can be null, client.getPhones() can be empty.
我用 3 种不同的方法分离了流程。
public void createSyContact(ServerRequest request, long clientId) {
secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
if (client.getSecondary() != null) {
return toNewClientSyContact(clientId, client.getSecondary(), usr)
.flatMap(clientSyContactRepository::save).flatMap(clientRes -> {
return Mono.just(clientRes.getClientId());
});
} else {
return Mono.empty();
}
});
});
}
public void createReferral(ServerRequest request, long clientId) {
secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
if (client.getDiscount() != null) {
return toNewClientReferral(clientId, client.getDiscount(), usr)
.flatMap(clientReferralRepository::save).flatMap(clientRes -> {
return Mono.just(clientRes.getClientId());
});
} else {
return Mono.empty();
}
});
});
}
public Mono<Long> createClientWithPhones(ServerRequest request) {
return secContext.retrieveUser().flatMap(usr -> {
return request.bodyToMono(ClientDto.class).flatMap(client -> {
return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
return clientPhoneRepository
.saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr)).collectList()
.flatMap(phoneRes -> {
return Mono.just(clientRes.getClientId());
});
});
});
});
}
在这里,createClientWithPhones 是强制性的,所以没有 if 检查那里。但是其他 2 个方法 createReferral 和 createSyContact 有 if 检查。需要先执行createClientWithPhones,它会return clientId。此 clientId 应在 createReferral 和 createSyContact 中使用。
public Mono<ServerResponse> createClientProfile(ServerRequest request) {
final List<Long> clinetIdList = new ArrayList<>();
createClientWithPhones(request).subscribe(result -> {
clinetIdList.add(result.longValue());
createSyContact(request, result.longValue());
createReferral(request, result.longValue());
});
return ServerResponse
.created(URI.create(String.format(CLIENT_URI_FORMAT,
clinetIdList.get(0))))
.contentType(APPLICATION_JSON).build();
}
是这样处理的吗?
例如,可以在 flatMap
中完成一个简单的 if 语句,然后执行。
public Mono<String> foobar() {
return Mono.just("foo").flatMap(value -> {
if(value != null)
return Mono.just("Has value");
else
return Mono.empty();
}
}
foobar()
.switchIfEmpty(Mono.just("Is empty"))
.subscribe(output -> System.out.println(output);
好吧,我认为人们对响应式库没有很好的理解。我的意思是,通常人们接近 Java 8 流,因为他们正在尝试进行函数式编程。当然反应式库是基于函数式编程的,但我认为目的是围绕阻塞进行异步I/O。考虑 WebFlux 项目的(当前)首页。
What is reactive processing? Reactive processing is a paradigm that enables developers build non-blocking, asynchronous applications that can handle back-pressure (flow control).
所以,这是一种冗长的说法,我认为最好关注 I/O 发生的地方,而不是创建功能代码。如果你需要 if
语句,那么你需要 if
语句。与其尝试弄清楚如何使用函数式编程执行 if
语句,不如尝试找出 I/O 发生的位置并以异步方式处理它。我喜欢使用的一个“技巧”是 Mono::zip
或 Flux::zip
。这些函数将许多 I/O 调用合并到一个发布者中,然后 return 发送给客户端。因此,请考虑此示例代码。
让我们创建一些响应式 r2dbc 函数:
Mono<Client> save(Client client) {
client.id = 1L;
System.out.println("Save client: " + client.id);
return Mono.just(client);
}
Mono<Phone> save(Phone phone) {
System.out.println("Save phone: " + phone.clientId);
return Mono.just(phone);
}
Mono<Referral> save(Referral referral) {
System.out.println("Save referral: " + referral.clientId);
return Mono.just(referral);
}
Mono<Contact> save(Contact contact) {
System.out.println("Save contact: " + contact.clientId);
return Mono.just(contact);
}
我们需要一些示例 类 来使用:
class DTO {
Client client;
List<Phone> phones;
Optional<Contact> contact;
Optional<Referral> referral;
}
class Client {
Long id;
}
class Contact {
Long clientId;
}
class Referral {
Long clientId;
}
class Phone {
Long clientId;
}
我们的输入可能是一个 Mono<DTO>
,因为这是 Request 应该提供的,所以我们的 Service
层需要从那个开始,return 一个 Mono<Long>
客户端 ID。
Mono<Long> doWork(Mono<DTO> monoDto) {
return monoDto.flatMap(dto->{
return save(dto.client).flatMap(client->{
List<Mono<?>> publishers = new ArrayList<>();
dto.phones.forEach(phone->{
phone.clientId = client.id;
publishers.add(save(phone));
});
if ( dto.contact.isPresent()) {
Contact c = dto.contact.get();
c.clientId = client.id;
publishers.add(save(c));
}
if ( dto.referral.isPresent()) {
Referral r = dto.referral.get();
r.clientId = client.id;
publishers.add(save(r));
}
if ( publishers.size() > 0 )
return Mono.zip(publishers, obs->client.id);
else
return Mono.just(client.id);
});
});
}
我 运行 使用以下示例代码:
@Override
public void run(ApplicationArguments args) throws Exception {
saveClient(new Client(), null, null, null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), null, null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), new Contact(), null).subscribe(System.out::println);
saveClient(new Client(), new Phone(), new Contact(), new Referral()).subscribe(System.out::println);
}
private Mono<Long> saveClient(Client client, Phone phone, Contact contact,
Referral referral) {
// TODO Auto-generated method stub
DTO dto = new DTO();
dto.client = client;
dto.phones = new ArrayList<>();
if ( phone != null ) dto.phones.add(phone);
dto.contact = Optional.ofNullable(contact);
dto.referral = Optional.ofNullable(referral);
return doWork(Mono.just(dto));
}
因此,这使用了 Mono.zip
技巧。保存的客户端是平面映射的,因此首先完成。然后为需要完成的所有后续保存创建单声道列表。这些monos都是由Mono.zip函数异步执行的。 “combiner”函数对结果没有任何作用,它只是 returns clientId,这是客户端想要的。 Mono.zip 将所有 Monos 组合成一个单一的 Mono 以 return 发送给客户端。从某种意义上说,这只是采用过程代码并将其包装在响应式库中,而不是过分关注函数式编程。如果业务“流程”发生变化,这很容易阅读和修改。
如果你喜欢,这是一个起点。我没有使用 Repository::saveAll
,所以这可能是一个改进。
确保所有 Flux
和 Mono
发布商都链接在一起很重要。在你的最后一个例子中,你似乎放弃了它们。仅仅创建它们是不够的,它们都必须以某种方式 return 发送给客户端。此外,您的代码有一个 subscribe
调用,这是一个禁忌。只有客户应该订阅。我认为你应该在那里使用 map
。
编辑:修复了一个错误。仔细检查你的代码。