如何让 Mono 等待 dependenat fetch 方法具有 运行
How do I get a Mono to wait till dependenat fetch method has run
我正在尝试通过使用 webflux 的 Web 服务实现导出到 excel 功能,因为其他 api 和控制器运行良好。我的问题是调用生成 excel 文件的函数是在从存储库中以 Flux 形式检索数据后访问的(没有问题)。我对结果进行了排序,并试图通过 flatMap 调用另一个填充方法,我遇到了很多问题试图让它工作并确保 flatMap 中的代码在 webservice 中的代码之前运行 return 文件。
下面是网络服务的代码:
@GetMapping(API_BASE_PATH + "/download")
public ResponseEntity<byte[]> download() {
Mono<Void> createExcel = excelExport.createDocument(false);
Mono.when(createExcel).log("Excel Created").then();
Workbook workbook = excelExport.getWb();
OutputStream outputStream = new ByteArrayOutputStream();
try {
workbook.write(outputStream);
} catch (IOException e) {
e.printStackTrace();
}
byte[] media = ((ByteArrayOutputStream) outputStream).toByteArray();
HttpHeaders headers = new HttpHeaders();
headers.setCacheControl(CacheControl.noCache().getHeaderValue());
headers.setContentType(MediaType.valueOf("text/html"));
headers.set("Content-disposition", "attachment; filename=filename.xlsx");
ResponseEntity<byte[]> responseEntity = new ResponseEntity<>(media, headers, HttpStatus.OK);
return responseEntity;
}
以及 exelExport 的代码 class:
public Mono<Void> createDocument(boolean all) {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
try {
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
Row row = sheet.getRow(1);
Cell cell = row.getCell(3);
if (cell == null)
cell = row.createCell(3);
cell.setCellType(CellType.STRING);
cell.setCellValue("a test");
log.info("Created document");
Flux<TimeKeepingEntry> entries = service.findByMonth(LocalDate.now().getMonth().getDisplayName(TextStyle.FULL, Locale.ENGLISH)).log("Excel Export - retrievedMonths");
entries.subscribe();
return entries.groupBy(TimeKeepingEntry::getDateOfMonth).flatMap(Flux::collectList).flatMap(timeKeepingEntries -> this.populateEntry(sheet, timeKeepingEntries)).then();
} catch (IOException e) {
log.error("Error Creating Document", e);
}
//should never get here
return Mono.empty();
}
private void populateEntry(Sheet sheet, List<TimeKeepingEntry> timeKeepingEntries) {
int rowNum = 0;
for (int i = 0; i < timeKeepingEntries.size(); i++) {
TimeKeepingEntry timeKeepingEntry = timeKeepingEntries.get(i);
if (i == 0) {
rowNum = calculateFirstRow(timeKeepingEntry.getDay());
}
LocalDate date = timeKeepingEntry.getFullDate();
Row row2 = sheet.getRow(rowNum);
Cell cell2 = row2.getCell(1);
cell2.setCellValue(date.toString());
if (timeKeepingEntry.getDay().equals(DayOfWeek.FRIDAY.getDisplayName(TextStyle.FULL, Locale.ENGLISH))) {
rowNum = +2;
} else {
rowNum++;
}
}
}
工作簿永远不会更新,因为 populateEntry 永远不会执行。正如我所说,我尝试了多种不同的方法,包括 Mono.just 和 Mono.when,但在 webservice 方法尝试 return 之前,我似乎无法获得正确的组合来处理它文件。
任何帮助都会很棒。
Edit1:显示理想的 crateDocument 方法。
public Mono<Void> createDocument(boolean all) {
try {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
log.info("Created document");
if (all) {
//all entries
} else {
service.findByMonth(currentMonthName).log("Excel Export - retrievedMonths").collectSortedList(Comparator.comparing(TimeKeepingEntry::getDateOfMonth)).doOnNext(timeKeepingEntries -> {
this.populateEntry(sheet, timeKeepingEntries);
});
}
} catch (IOException e) {
log.error("Error Importing File", e);
}
return Mono.empty();
}
您的 web 服务的实现存在几个问题。
什么时候subscribe
首先,在响应式编程中,您通常必须尝试构建一个 单个 处理管道(通过调用 Mono
和 Flux
运算符以及 return 将最终结果作为 Mono
和 Flux
)。在任何情况下,您都应该让框架执行 subscribe
或至少只在该管道的末尾订阅一次。
这里您混合了两种方法:您的 createDocument
方法正确 return 和 Mono
,但它也执行 subscribe
。更糟糕的是,订阅是在一个中间步骤完成的,并且在 webservice 方法中没有订阅整个管道。
所以实际上,没有人看到管道的后半部分(从 groupBy
开始),因此它永远不会被执行(这是一个惰性 Flux
,也称为 "cold" 通量).
混合同步和异步
另一个问题又是混合使用两种方法的问题:您的 Flux
是惰性的和异步的,但是您的 Web 服务是以命令式和同步的方式编写的。
因此代码从数据库启动异步 Flux
,立即 return 到控制器并尝试从磁盘加载文件数据。
选项 1:使控制器更 Flux
面向
如果您使用 Spring MVC,您仍然可以编写这些命令式风格的控制器,但会散布在一些 WebFlux 中。在这种情况下,您可以 return Mono
或 Flux
并且 Spring MVC 会将其转换为正确的异步 Servlet 构造。但这意味着您必须将 OutputStream
和 bytes
处理变成 Mono
,使用 then
之类的东西将其链接到文档编写 Mono
/flatMap
/etc... 有点复杂
选项 2:将 Flux
变成命令式阻塞代码
另一种选择是通过在 createDocument()
Mono
上调用 block()
返回命令式和阻塞式。这将订阅它并等待它完成。之后,其余的命令式代码应该可以正常工作。
旁注
groupBy
有一个限制,如果它导致超过 256
个打开的组,它可以挂起。这里的组在到达文件末尾之前无法关闭,但幸运的是,由于您只处理一个月的数据,因此 Flux 不会超过 31
个组。
感谢@SimonBasie 的指点,我现在的工作代码如下。
@GetMapping(value = API_BASE_PATH + "/download", produces = "application/vnd.ms-excel")
public Mono<Resource> download() throws IOException {
Flux<TimeKeepingEntry> createExcel = excelExport.createDocument(false);
return createExcel.then(Mono.fromCallable(() -> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
excelExport.getWb().write(outputStream);
return new ByteArrayResource(outputStream.toByteArray());
}));
}
public Flux<TimeKeepingEntry> createDocument(boolean all) {
Flux<TimeKeepingEntry> entries = null;
try {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
log.info("Created document");
if (all) {
//all entries
} else {
entries = service.findByMonth(currentMonthName).log("Excel Export - retrievedMonths").sort(Comparator.comparing(TimeKeepingEntry::getDateOfMonth)).doOnNext(timeKeepingEntry-> {
this.populateEntry(sheet, timeKeepingEntry);
});
}
} catch (IOException e) {
log.error("Error Importing File", e);
}
return entries;
}
我正在尝试通过使用 webflux 的 Web 服务实现导出到 excel 功能,因为其他 api 和控制器运行良好。我的问题是调用生成 excel 文件的函数是在从存储库中以 Flux 形式检索数据后访问的(没有问题)。我对结果进行了排序,并试图通过 flatMap 调用另一个填充方法,我遇到了很多问题试图让它工作并确保 flatMap 中的代码在 webservice 中的代码之前运行 return 文件。
下面是网络服务的代码:
@GetMapping(API_BASE_PATH + "/download")
public ResponseEntity<byte[]> download() {
Mono<Void> createExcel = excelExport.createDocument(false);
Mono.when(createExcel).log("Excel Created").then();
Workbook workbook = excelExport.getWb();
OutputStream outputStream = new ByteArrayOutputStream();
try {
workbook.write(outputStream);
} catch (IOException e) {
e.printStackTrace();
}
byte[] media = ((ByteArrayOutputStream) outputStream).toByteArray();
HttpHeaders headers = new HttpHeaders();
headers.setCacheControl(CacheControl.noCache().getHeaderValue());
headers.setContentType(MediaType.valueOf("text/html"));
headers.set("Content-disposition", "attachment; filename=filename.xlsx");
ResponseEntity<byte[]> responseEntity = new ResponseEntity<>(media, headers, HttpStatus.OK);
return responseEntity;
}
以及 exelExport 的代码 class:
public Mono<Void> createDocument(boolean all) {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
try {
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
Row row = sheet.getRow(1);
Cell cell = row.getCell(3);
if (cell == null)
cell = row.createCell(3);
cell.setCellType(CellType.STRING);
cell.setCellValue("a test");
log.info("Created document");
Flux<TimeKeepingEntry> entries = service.findByMonth(LocalDate.now().getMonth().getDisplayName(TextStyle.FULL, Locale.ENGLISH)).log("Excel Export - retrievedMonths");
entries.subscribe();
return entries.groupBy(TimeKeepingEntry::getDateOfMonth).flatMap(Flux::collectList).flatMap(timeKeepingEntries -> this.populateEntry(sheet, timeKeepingEntries)).then();
} catch (IOException e) {
log.error("Error Creating Document", e);
}
//should never get here
return Mono.empty();
}
private void populateEntry(Sheet sheet, List<TimeKeepingEntry> timeKeepingEntries) {
int rowNum = 0;
for (int i = 0; i < timeKeepingEntries.size(); i++) {
TimeKeepingEntry timeKeepingEntry = timeKeepingEntries.get(i);
if (i == 0) {
rowNum = calculateFirstRow(timeKeepingEntry.getDay());
}
LocalDate date = timeKeepingEntry.getFullDate();
Row row2 = sheet.getRow(rowNum);
Cell cell2 = row2.getCell(1);
cell2.setCellValue(date.toString());
if (timeKeepingEntry.getDay().equals(DayOfWeek.FRIDAY.getDisplayName(TextStyle.FULL, Locale.ENGLISH))) {
rowNum = +2;
} else {
rowNum++;
}
}
}
工作簿永远不会更新,因为 populateEntry 永远不会执行。正如我所说,我尝试了多种不同的方法,包括 Mono.just 和 Mono.when,但在 webservice 方法尝试 return 之前,我似乎无法获得正确的组合来处理它文件。
任何帮助都会很棒。
Edit1:显示理想的 crateDocument 方法。
public Mono<Void> createDocument(boolean all) {
try {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
log.info("Created document");
if (all) {
//all entries
} else {
service.findByMonth(currentMonthName).log("Excel Export - retrievedMonths").collectSortedList(Comparator.comparing(TimeKeepingEntry::getDateOfMonth)).doOnNext(timeKeepingEntries -> {
this.populateEntry(sheet, timeKeepingEntries);
});
}
} catch (IOException e) {
log.error("Error Importing File", e);
}
return Mono.empty();
}
您的 web 服务的实现存在几个问题。
什么时候subscribe
首先,在响应式编程中,您通常必须尝试构建一个 单个 处理管道(通过调用 Mono
和 Flux
运算符以及 return 将最终结果作为 Mono
和 Flux
)。在任何情况下,您都应该让框架执行 subscribe
或至少只在该管道的末尾订阅一次。
这里您混合了两种方法:您的 createDocument
方法正确 return 和 Mono
,但它也执行 subscribe
。更糟糕的是,订阅是在一个中间步骤完成的,并且在 webservice 方法中没有订阅整个管道。
所以实际上,没有人看到管道的后半部分(从 groupBy
开始),因此它永远不会被执行(这是一个惰性 Flux
,也称为 "cold" 通量).
混合同步和异步
另一个问题又是混合使用两种方法的问题:您的 Flux
是惰性的和异步的,但是您的 Web 服务是以命令式和同步的方式编写的。
因此代码从数据库启动异步 Flux
,立即 return 到控制器并尝试从磁盘加载文件数据。
选项 1:使控制器更 Flux
面向
如果您使用 Spring MVC,您仍然可以编写这些命令式风格的控制器,但会散布在一些 WebFlux 中。在这种情况下,您可以 return Mono
或 Flux
并且 Spring MVC 会将其转换为正确的异步 Servlet 构造。但这意味着您必须将 OutputStream
和 bytes
处理变成 Mono
,使用 then
之类的东西将其链接到文档编写 Mono
/flatMap
/etc... 有点复杂
选项 2:将 Flux
变成命令式阻塞代码
另一种选择是通过在 createDocument()
Mono
上调用 block()
返回命令式和阻塞式。这将订阅它并等待它完成。之后,其余的命令式代码应该可以正常工作。
旁注
groupBy
有一个限制,如果它导致超过 256
个打开的组,它可以挂起。这里的组在到达文件末尾之前无法关闭,但幸运的是,由于您只处理一个月的数据,因此 Flux 不会超过 31
个组。
感谢@SimonBasie 的指点,我现在的工作代码如下。
@GetMapping(value = API_BASE_PATH + "/download", produces = "application/vnd.ms-excel")
public Mono<Resource> download() throws IOException {
Flux<TimeKeepingEntry> createExcel = excelExport.createDocument(false);
return createExcel.then(Mono.fromCallable(() -> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
excelExport.getWb().write(outputStream);
return new ByteArrayResource(outputStream.toByteArray());
}));
}
public Flux<TimeKeepingEntry> createDocument(boolean all) {
Flux<TimeKeepingEntry> entries = null;
try {
InputStream inputStream = new ClassPathResource("Timesheet Template.xlsx").getInputStream();
wb = WorkbookFactory.create(inputStream);
Sheet sheet = wb.getSheetAt(0);
log.info("Created document");
if (all) {
//all entries
} else {
entries = service.findByMonth(currentMonthName).log("Excel Export - retrievedMonths").sort(Comparator.comparing(TimeKeepingEntry::getDateOfMonth)).doOnNext(timeKeepingEntry-> {
this.populateEntry(sheet, timeKeepingEntry);
});
}
} catch (IOException e) {
log.error("Error Importing File", e);
}
return entries;
}