使用来自 flux 的数据将 flux 减少为单声道
Reduce a flux to mono using data from flux
我有这个场景。我有一个分页 API,它提供了过去 12 个月的数据。 API 的响应如下:
public class PagedTransfersDto {
private List<Transfer> content;
private Page page;
@Getter
public static class Transfer {
private String id;
private Long transferId;
private Long transferRequestId;
private String status;
private BigDecimal accountReceivable;
private BigDecimal accountPayable;
private BigDecimal netReceivable;
private BigDecimal netPayable;
private String currency;
private Long transferDate;
}
@Getter
public static class Page {
private Integer size;
private Integer number;
private Integer totalElements;
private Integer totalPages;
}
}
现在我必须收集所有数据,然后计算所有 netReceivable
和 return 的总和作为 Mono<CompanyIncome>
。这个pojo就像
public class CompanyIncome {
private BigDecimal inferredIncome = new BigDecimal(0);
}
为此我写了类似的东西:
CompanyIncome initialIncome = new CompanyIncome();
return myService.getTransfers(0, 50, fromDate, toDate)
.expand(pagedTransfersDto -> {
if (pagedTransfersDto.getPage().getNumber().equals(pagedTransfersDto.getPage().getTotalPages())) {
return Mono.empty();
}
return myService.getTransfers(pagedTransfersDto.getPage().getNumber() + 1, 50, fromDate, toDate);
})
.flatMap(pagedTransfersDto -> Flux.fromIterable(pagedTransfersDto.getContent()))
.reduce(initialIncome, ((companyIncome, transfer) -> {
companyIncome.setInferredIncome(companyIncome.getInferredIncome().add(transfer.getNetReceivable()));
return companyIncome;
}));
现在要注意的是,这个数据可能只有 3 个月,在这种情况下,我必须通过乘以 4 将其推断为 12 个月。
我想的是获取transfers列表的第一项和最后一项,看看数据是不是整年的,但是想不出在什么地方进行这个操作。
因为减少传输数据后就没有了。在此之前,我似乎无法找到一种方法来获取此信息并仍然减少传输通量
我对反应方式有点陌生,似乎无法找到一种方法来做到这一点。任何帮助将不胜感激。谢谢
为此,最好的解决方案是在简化对象中存储必要的“元数据”。您已经有一个 CompanyIncome
对象,所以也许那是个好地方?否则我会引入一个 Tuple2
或一些中间业务对象(例如 CompanyIncomeAggregator
),在其中存储汇总收入和您需要在最后决定是否需要进一步处理的信息.
然后在 map
步骤中,您将阅读该信息,对其采取行动,然后 return 按原样计算的收入或根据您的标准进行修改。
重要说明:使用反应链外部的变量是一种代码味道,因为它引入了泄漏的共享状态:如果对同一个 Mono 进行了两个订阅,它们将在同一个 CompanyIncome
对象上工作.您可以在此处使用 reduceWith
进行补救,初始值采用 Supplier
:reduceWith(CompanyIncome::new, ...)
.
我有这个场景。我有一个分页 API,它提供了过去 12 个月的数据。 API 的响应如下:
public class PagedTransfersDto {
private List<Transfer> content;
private Page page;
@Getter
public static class Transfer {
private String id;
private Long transferId;
private Long transferRequestId;
private String status;
private BigDecimal accountReceivable;
private BigDecimal accountPayable;
private BigDecimal netReceivable;
private BigDecimal netPayable;
private String currency;
private Long transferDate;
}
@Getter
public static class Page {
private Integer size;
private Integer number;
private Integer totalElements;
private Integer totalPages;
}
}
现在我必须收集所有数据,然后计算所有 netReceivable
和 return 的总和作为 Mono<CompanyIncome>
。这个pojo就像
public class CompanyIncome {
private BigDecimal inferredIncome = new BigDecimal(0);
}
为此我写了类似的东西:
CompanyIncome initialIncome = new CompanyIncome();
return myService.getTransfers(0, 50, fromDate, toDate)
.expand(pagedTransfersDto -> {
if (pagedTransfersDto.getPage().getNumber().equals(pagedTransfersDto.getPage().getTotalPages())) {
return Mono.empty();
}
return myService.getTransfers(pagedTransfersDto.getPage().getNumber() + 1, 50, fromDate, toDate);
})
.flatMap(pagedTransfersDto -> Flux.fromIterable(pagedTransfersDto.getContent()))
.reduce(initialIncome, ((companyIncome, transfer) -> {
companyIncome.setInferredIncome(companyIncome.getInferredIncome().add(transfer.getNetReceivable()));
return companyIncome;
}));
现在要注意的是,这个数据可能只有 3 个月,在这种情况下,我必须通过乘以 4 将其推断为 12 个月。
我想的是获取transfers列表的第一项和最后一项,看看数据是不是整年的,但是想不出在什么地方进行这个操作。
因为减少传输数据后就没有了。在此之前,我似乎无法找到一种方法来获取此信息并仍然减少传输通量
我对反应方式有点陌生,似乎无法找到一种方法来做到这一点。任何帮助将不胜感激。谢谢
为此,最好的解决方案是在简化对象中存储必要的“元数据”。您已经有一个 CompanyIncome
对象,所以也许那是个好地方?否则我会引入一个 Tuple2
或一些中间业务对象(例如 CompanyIncomeAggregator
),在其中存储汇总收入和您需要在最后决定是否需要进一步处理的信息.
然后在 map
步骤中,您将阅读该信息,对其采取行动,然后 return 按原样计算的收入或根据您的标准进行修改。
重要说明:使用反应链外部的变量是一种代码味道,因为它引入了泄漏的共享状态:如果对同一个 Mono 进行了两个订阅,它们将在同一个 CompanyIncome
对象上工作.您可以在此处使用 reduceWith
进行补救,初始值采用 Supplier
:reduceWith(CompanyIncome::new, ...)
.