Project Reactor 的 Flux 在处理错误期间获取失败的项目
Project Reactor's Flux get failed item during handling error
我需要处理 Flux
流错误,为此我需要知道具体是什么项目失败了。似乎方法 doOnError
应该适合处理错误,但我只能得到异常而不是失败的项目。有没有办法同时获得失败项和异常?
private void testFluxIterableFlow() {
Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.map(this::process)
.doOnError(ex -> {
...
})
.doOnNext(processedValue -> ...)
.subscribe();
}
private String process(Integer value) {
if (value == 4) {
throw new RuntimeException("error...");
}
return "processed " + value;
}
在此示例中,我需要在错误处理程序中接收失败的项目 4
和异常消息。
如果某个值导致了异常,您应该将该值视为异常原因。
由您决定是否值得添加新的异常类型,
@RequiredArgsConstructor
public class WrongValueInStreamException extends RuntimeException {
@Getter
private final Object wrongValue;
}
public class StreamProcessor {
public String process(Integer value) {
if (value == 4) {
throw new WrongValueInStreamException(4);
}
return "processed " + value;
}
}
但只要异常传达有用的相关信息,这就是一个好习惯:
.doOnError(WrongValueInStreamException.class::isInstance, e -> {
final Object value = ((WrongValueInStreamException) e).getWrongValue();
// use 'value'
})
这里有点晚了,但万一其他人需要它...您可能想要做的是改用 onErrorContinue。请注意,此方法将从通量中删除错误项并继续处理,因此请务必牢记这一点。如果你想要错误 + 项目并停止一切,我相信如果你从这个方法中重新抛出异常,并且没有其他下游 onErrorContinue,它应该终止通量。这是我使用 kafka reactive api
的一个例子
此外,传递给 onErrorContinue 的项目将不是 flux 中的原始项目,而是在反应链中的步骤中尝试处理的对象。例如,如果链在尝试转换 B -> C 时失败,但在转换 A -> B 之后,您将在处理程序中获得的对象将是 B。
@EventListener(ApplicationReadyEvent.class)
public void process() {
receiver.receiveAutoAck()
.concatMap(Function.identity())
.flatMap(this::deserialzeRecord)
.flatMap(this::processEvent)
.flatMap(responseSender::send)
.onErrorContinue(this::handleRecordError)
.subscribe();
}
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
}
如果你想终止 flux 并丢弃其余的项目就这样做
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
throw throwable;
}
我需要处理 Flux
流错误,为此我需要知道具体是什么项目失败了。似乎方法 doOnError
应该适合处理错误,但我只能得到异常而不是失败的项目。有没有办法同时获得失败项和异常?
private void testFluxIterableFlow() {
Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.map(this::process)
.doOnError(ex -> {
...
})
.doOnNext(processedValue -> ...)
.subscribe();
}
private String process(Integer value) {
if (value == 4) {
throw new RuntimeException("error...");
}
return "processed " + value;
}
在此示例中,我需要在错误处理程序中接收失败的项目 4
和异常消息。
如果某个值导致了异常,您应该将该值视为异常原因。
由您决定是否值得添加新的异常类型,
@RequiredArgsConstructor
public class WrongValueInStreamException extends RuntimeException {
@Getter
private final Object wrongValue;
}
public class StreamProcessor {
public String process(Integer value) {
if (value == 4) {
throw new WrongValueInStreamException(4);
}
return "processed " + value;
}
}
但只要异常传达有用的相关信息,这就是一个好习惯:
.doOnError(WrongValueInStreamException.class::isInstance, e -> {
final Object value = ((WrongValueInStreamException) e).getWrongValue();
// use 'value'
})
这里有点晚了,但万一其他人需要它...您可能想要做的是改用 onErrorContinue。请注意,此方法将从通量中删除错误项并继续处理,因此请务必牢记这一点。如果你想要错误 + 项目并停止一切,我相信如果你从这个方法中重新抛出异常,并且没有其他下游 onErrorContinue,它应该终止通量。这是我使用 kafka reactive api
的一个例子此外,传递给 onErrorContinue 的项目将不是 flux 中的原始项目,而是在反应链中的步骤中尝试处理的对象。例如,如果链在尝试转换 B -> C 时失败,但在转换 A -> B 之后,您将在处理程序中获得的对象将是 B。
@EventListener(ApplicationReadyEvent.class)
public void process() {
receiver.receiveAutoAck()
.concatMap(Function.identity())
.flatMap(this::deserialzeRecord)
.flatMap(this::processEvent)
.flatMap(responseSender::send)
.onErrorContinue(this::handleRecordError)
.subscribe();
}
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
}
如果你想终止 flux 并丢弃其余的项目就这样做
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
throw throwable;
}