从 spring 批处理器调用 Async REST api
Calling Async REST api from spring batch processor
我编写了一个 spring 处理列表列表的批处理作业。
Reader returns 列表列表。
处理器处理每个 ListItem 和 returns 已处理的列表。
Writer 从 List of List 向 DB 和 sftp 写入内容。
我有一个用例,我从 spring 批处理器调用 Async REST api。
在 ListenableFuture 响应中,我实现了 LitenableFutureCallback 来处理成功和失败,这按预期工作,但在异步调用 returns 之前,ItemProcessor 不会等待来自异步 api 和 returns 的回调对象(列表)到作者。
我不确定如何实现和处理来自 ItemProcessor 的异步调用。
我读过有关 AsyncItemProcessor 和 AsyncItemWriter 的内容,但我不确定在这种情况下我是否应该使用它。
我还考虑过在 AsyncRestTemplate 的 ListenableFuture 响应上调用 get(),但根据文档,它将阻塞当前线程,直到它收到响应。
我正在寻求一些关于如何实现它的帮助。下面的代码片段:
处理器:
public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {
... Initialization code
@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
logger.info("Entering MailingDocsEntity processor");
List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);
for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());
..code to get the file
//If the file is not a pdf convert it
String fileExtension = readFromSpResponse.getFileExtension();
String fileName = readFromSpResponse.getFileName();
byte[] fileBytes = readFromSpResponse.getByteArray();
try {
//Do checks to make sure PDF file is being sent
if (!"pdf".equalsIgnoreCase(fileExtension)) {
//Only doc, docx and xlsx conversions are supported
...Building REquest object
//make async call to pdf conversion service
pdfService.convertDocxToPdf(request, mailingDocsEntity);
} else {
logger.error("The file cannot be converted to a pdf.\n"
);
}
}
} catch (Exception ex){
logger.error("There has been an exception while processing data", ex);
}
}
return synchronizedList;
}
}
异步 PdfConversion 服务 Class:
@Service
public class PdfService{
@Autowired
@Qualifier("MicroServiceAsyncRestTemplate")
AsyncRestTemplate microServiceAsyncRestTemplate;
public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){
ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<>(request, headers);
ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);
ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>() {
@Override
public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
...code to do stuff on success
}
@Override
public void onFailure(Throwable ex) {
pdfResponse.setMessage("Exception while retrieving response");
}
});
} catch (Exception e) {
String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
pdfResponse.setMessage(message);
logger.error(message, e);
}
return pdfResponse;
}
}
我原来的批处理作业是同步的,我正在转换为异步以加快处理速度。
我确实尝试寻找类似的问题,但找不到足够的信息。
非常感谢任何指点或帮助。
谢谢!!
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
是的,AsyncItemProcessor
和 AsyncItemWriter
适合您的用例。 AsyncItemProcessor
将为新线程上的项目执行委托 ItemProcessor
的逻辑(您的其余调用)。项目完成后,结果的 Future
将传递给要写入的 AsynchItemWriter
。然后 AsynchItemWriter
将打开 Future
并写入项目。这些组件的优点是您不必自己处理 Future
的包装、解包等。
您可以找到:
- 此处有更多详细信息:https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-processors
- 此处示例:https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java
希望这对您有所帮助。
我编写了一个 spring 处理列表列表的批处理作业。
Reader returns 列表列表。 处理器处理每个 ListItem 和 returns 已处理的列表。 Writer 从 List of List 向 DB 和 sftp 写入内容。
我有一个用例,我从 spring 批处理器调用 Async REST api。 在 ListenableFuture 响应中,我实现了 LitenableFutureCallback 来处理成功和失败,这按预期工作,但在异步调用 returns 之前,ItemProcessor 不会等待来自异步 api 和 returns 的回调对象(列表)到作者。
我不确定如何实现和处理来自 ItemProcessor 的异步调用。
我读过有关 AsyncItemProcessor 和 AsyncItemWriter 的内容,但我不确定在这种情况下我是否应该使用它。
我还考虑过在 AsyncRestTemplate 的 ListenableFuture 响应上调用 get(),但根据文档,它将阻塞当前线程,直到它收到响应。
我正在寻求一些关于如何实现它的帮助。下面的代码片段:
处理器:
public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {
... Initialization code
@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
logger.info("Entering MailingDocsEntity processor");
List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);
for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());
..code to get the file
//If the file is not a pdf convert it
String fileExtension = readFromSpResponse.getFileExtension();
String fileName = readFromSpResponse.getFileName();
byte[] fileBytes = readFromSpResponse.getByteArray();
try {
//Do checks to make sure PDF file is being sent
if (!"pdf".equalsIgnoreCase(fileExtension)) {
//Only doc, docx and xlsx conversions are supported
...Building REquest object
//make async call to pdf conversion service
pdfService.convertDocxToPdf(request, mailingDocsEntity);
} else {
logger.error("The file cannot be converted to a pdf.\n"
);
}
}
} catch (Exception ex){
logger.error("There has been an exception while processing data", ex);
}
}
return synchronizedList;
}
}
异步 PdfConversion 服务 Class:
@Service
public class PdfService{
@Autowired
@Qualifier("MicroServiceAsyncRestTemplate")
AsyncRestTemplate microServiceAsyncRestTemplate;
public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){
ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<>(request, headers);
ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);
ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>() {
@Override
public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
...code to do stuff on success
}
@Override
public void onFailure(Throwable ex) {
pdfResponse.setMessage("Exception while retrieving response");
}
});
} catch (Exception e) {
String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
pdfResponse.setMessage(message);
logger.error(message, e);
}
return pdfResponse;
}
}
我原来的批处理作业是同步的,我正在转换为异步以加快处理速度。 我确实尝试寻找类似的问题,但找不到足够的信息。 非常感谢任何指点或帮助。
谢谢!!
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
是的,AsyncItemProcessor
和 AsyncItemWriter
适合您的用例。 AsyncItemProcessor
将为新线程上的项目执行委托 ItemProcessor
的逻辑(您的其余调用)。项目完成后,结果的 Future
将传递给要写入的 AsynchItemWriter
。然后 AsynchItemWriter
将打开 Future
并写入项目。这些组件的优点是您不必自己处理 Future
的包装、解包等。
您可以找到:
- 此处有更多详细信息:https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-processors
- 此处示例:https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java
希望这对您有所帮助。