如何在 Flux 中获取导致异常的元素?
How to get an element that caused an exception in Flux?
假设我有一个 ID 数组:[9, 8, 7, 6]
.
我做了一些处理,一个元素导致抛出异常。我想以自己的方式处理这种情况(比如说记录下来),让其他元素顺其自然。
我怎么知道是哪一个?我需要在我的 onError
处理中包含此元素。
Flux.fromArray(myArray)
.flatMap(element -> {
var foo = processMyEl(element);
return anotherProcess(foo); // this returns Mono
})
.onErrorOperator(element -> handleMyError(element)) // this line is what I need
所以,据我所见,这个几乎不错的 .onErrorContinue((error, obj) ->
发出了一个错误和一个对象。
但是这个 obj
不是导致异常的 element
而是导致异常的对象。它发生在我的处理方法内部,并且不必每次都是相同类型的对象。
.onErrorReturn(...)
- 不是我想要的
.doOnError(error ->
- 没有我的元素的信息
.onErrorResume(error ->
- 同上
有人建议我可以创建自己的异常并将元素传递到那里,然后从异常中检索它。但是我应该如何抛出异常呢?
我应该使用旧的 try catch 方法吗:
Flux.fromArray(myArray)
.flatMap(el -> {
try {
var foo = processMyEl(el);
return anotherProcess(foo); // this returns Mono
} catch (Exception e) {
return Mono.error(new MyException(el));
}
})
.onErrorOperator(error -> handleMyError(error.getElement()))
看起来不太好
编辑:
不仅不好看,而且还不行。异常完全没有被捕获直接触发doOnTerminate()
并停止整个流
更新:
感谢@JEY 我在 flatMap
.
中使用了 .onErrorResume()
我还通过 Mono.defer(() -> Mono.just(processMyEl(el)))
将第一个方法转换为反应流。
请注意:使用 Mono.defer()
允许我使用 onErrorResume
因为 Mono.just()
最终代码如下所示:
Flux.fromArray(myArray)
.flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
.onErrorResume(th -> handleMyError(element, th))
)
.flatMap(foo -> anotherProcess(foo)
.onErrorResume(th -> handleMyError(foo, th)
)
其中:
private Mono<> handleMyError(el, th) {
// handling code
return Mono.empty()
}
} catch (Exception e) {
throw new MyException(el, e);
}
应@Kamil 的要求,我将添加我的评论作为答案:
你应该只处理 flatMap 中的错误和 return a Mono.empty() 来丢弃它做一些像这样的事情:
Flux.fromArray(myArray)
.flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))
句柄错误如:
Mono<Void> handleError(Throwable th, Object element) {
LOG.error("An error occurred on {}", element, th);
return Mono.empty()
}
或者如果你想做一些需要异步的更复杂的事情:
Mono<Void> handleError(Throwable th, Object element) {
return doSomethingThaReturnFluxOrMono(element).then();
}
假设我有一个 ID 数组:[9, 8, 7, 6]
.
我做了一些处理,一个元素导致抛出异常。我想以自己的方式处理这种情况(比如说记录下来),让其他元素顺其自然。
我怎么知道是哪一个?我需要在我的 onError
处理中包含此元素。
Flux.fromArray(myArray)
.flatMap(element -> {
var foo = processMyEl(element);
return anotherProcess(foo); // this returns Mono
})
.onErrorOperator(element -> handleMyError(element)) // this line is what I need
所以,据我所见,这个几乎不错的 .onErrorContinue((error, obj) ->
发出了一个错误和一个对象。
但是这个 obj
不是导致异常的 element
而是导致异常的对象。它发生在我的处理方法内部,并且不必每次都是相同类型的对象。
.onErrorReturn(...)
- 不是我想要的
.doOnError(error ->
- 没有我的元素的信息
.onErrorResume(error ->
- 同上
有人建议我可以创建自己的异常并将元素传递到那里,然后从异常中检索它。但是我应该如何抛出异常呢?
我应该使用旧的 try catch 方法吗:
Flux.fromArray(myArray)
.flatMap(el -> {
try {
var foo = processMyEl(el);
return anotherProcess(foo); // this returns Mono
} catch (Exception e) {
return Mono.error(new MyException(el));
}
})
.onErrorOperator(error -> handleMyError(error.getElement()))
看起来不太好
编辑:
不仅不好看,而且还不行。异常完全没有被捕获直接触发doOnTerminate()
并停止整个流
更新:
感谢@JEY 我在 flatMap
.
.onErrorResume()
我还通过 Mono.defer(() -> Mono.just(processMyEl(el)))
将第一个方法转换为反应流。
请注意:使用 Mono.defer()
允许我使用 onErrorResume
因为 Mono.just()
最终代码如下所示:
Flux.fromArray(myArray)
.flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
.onErrorResume(th -> handleMyError(element, th))
)
.flatMap(foo -> anotherProcess(foo)
.onErrorResume(th -> handleMyError(foo, th)
)
其中:
private Mono<> handleMyError(el, th) {
// handling code
return Mono.empty()
}
} catch (Exception e) {
throw new MyException(el, e);
}
应@Kamil 的要求,我将添加我的评论作为答案:
你应该只处理 flatMap 中的错误和 return a Mono.empty() 来丢弃它做一些像这样的事情:
Flux.fromArray(myArray)
.flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))
句柄错误如:
Mono<Void> handleError(Throwable th, Object element) {
LOG.error("An error occurred on {}", element, th);
return Mono.empty()
}
或者如果你想做一些需要异步的更复杂的事情:
Mono<Void> handleError(Throwable th, Object element) {
return doSomethingThaReturnFluxOrMono(element).then();
}