Reactive Java OneError 恢复错误处理
Reactive Java OneError Resume error handling
尝试保存事件有这个流程(回购是反应性的,这只是测试的示例代码。我是新反应性的,我正在使用 io.projectreactor (3.3))
- 验证事件,失败时写入历史记录
- 如果验证成功,将事件写入 repo,任何失败写入历史
- 如果验证失败写入历史记录
- 诱导一些失败来模拟错误条件
import reactor.core.publisher.Mono;
public class MyTest {
static int counter = 0;
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();})
.subscribe();
}
private Mono<String> onMessage(String input)
{
return Mono.create(sink -> {
validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
sink.error(e);
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
sink.success();
return Mono.just(h);
})
.subscribe();
});
}
private Mono<String> processObject(String input)
{
return Mono.create(sink -> {
new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
sink.success(a);
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
sink.error(e);
return Mono.error(e);
}).subscribe();
});
}
private Mono<String> validate()
{
counter++;
return Mono.create(sink -> {
if (counter % 3 == 0)
{
sink.error(new RuntimeException("Validate method error"));
return;
}
sink.success("validate is done ");
return;
});
}
}
服务Class
public class Service {
public Mono<String> saveEvent(String id)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save event");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save event"));
}
public Mono<String> saveHistory(String id, boolean error)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save history");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save history"));
}
public Mono<String> save(String id)
{
if (id == null)
{
throw new RuntimeException("Error saving");
}
return Mono.just("save success");
}
}
我遇到了这个异常
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
at sample.MyTest.lambda$validate(MyTest.java:77)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.lambda$onMessage(MyTest.java:49)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.createMessage(MyTest.java:30)
at sample.MyTest.main(MyTest.java:18)
更新的工作代码:基于@Michael Berry 评论
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{
System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();
})
.subscribe();
}
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
private Mono<String> processObject(String input)
{
return new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
return Mono.error(e);
});
}
private Mono<String> validate()
{
counter++;
if (counter % 3 == 0)
{
return Mono.error(new RuntimeException("Validate method error"));
}
return Mono.just("validate is done ");
}
结果
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
由于您的 onMessage()
实施,您在此处遇到错误,这有点奇怪:
- 您没有理由在
Mono.create()
中包装 Mono
;
- 您自己订阅了这个内部发布者 - 这几乎总是错误的做法,而且不一定会按照您的预期进行(订阅发布者应该由框架处理,而不是您的代码。)在在这种情况下,关键是它意味着它是单独处理的,而不是你的反应链的一部分,所以你的错误处理可能没有像你期望的那样映射到内部发布者;
- 你的
onErrorResume()
调用这个内部发布者本身 returns 一个错误,并且这个内部发布者没有其他错误处理 - 因此为什么那个错误没有被处理,所以它然后打印出堆栈跟踪你看到的。
相反,您很可能希望您的 onMessage()
方法这样读:
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
...没有 Mono.create()
(这实际上只是为了兼容性目的由非反应器回调 API 使用。)您进行此更改后的输出如下所示:
input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
尝试保存事件有这个流程(回购是反应性的,这只是测试的示例代码。我是新反应性的,我正在使用 io.projectreactor (3.3))
- 验证事件,失败时写入历史记录
- 如果验证成功,将事件写入 repo,任何失败写入历史
- 如果验证失败写入历史记录
- 诱导一些失败来模拟错误条件
import reactor.core.publisher.Mono;
public class MyTest {
static int counter = 0;
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();})
.subscribe();
}
private Mono<String> onMessage(String input)
{
return Mono.create(sink -> {
validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
sink.error(e);
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
sink.success();
return Mono.just(h);
})
.subscribe();
});
}
private Mono<String> processObject(String input)
{
return Mono.create(sink -> {
new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
sink.success(a);
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
sink.error(e);
return Mono.error(e);
}).subscribe();
});
}
private Mono<String> validate()
{
counter++;
return Mono.create(sink -> {
if (counter % 3 == 0)
{
sink.error(new RuntimeException("Validate method error"));
return;
}
sink.success("validate is done ");
return;
});
}
}
服务Class
public class Service {
public Mono<String> saveEvent(String id)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save event");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save event"));
}
public Mono<String> saveHistory(String id, boolean error)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save history");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save history"));
}
public Mono<String> save(String id)
{
if (id == null)
{
throw new RuntimeException("Error saving");
}
return Mono.just("save success");
}
}
我遇到了这个异常
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
at sample.MyTest.lambda$validate(MyTest.java:77)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.lambda$onMessage(MyTest.java:49)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.createMessage(MyTest.java:30)
at sample.MyTest.main(MyTest.java:18)
更新的工作代码:基于@Michael Berry 评论
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{
System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();
})
.subscribe();
}
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
private Mono<String> processObject(String input)
{
return new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
return Mono.error(e);
});
}
private Mono<String> validate()
{
counter++;
if (counter % 3 == 0)
{
return Mono.error(new RuntimeException("Validate method error"));
}
return Mono.just("validate is done ");
}
结果
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
由于您的 onMessage()
实施,您在此处遇到错误,这有点奇怪:
- 您没有理由在
Mono.create()
中包装Mono
; - 您自己订阅了这个内部发布者 - 这几乎总是错误的做法,而且不一定会按照您的预期进行(订阅发布者应该由框架处理,而不是您的代码。)在在这种情况下,关键是它意味着它是单独处理的,而不是你的反应链的一部分,所以你的错误处理可能没有像你期望的那样映射到内部发布者;
- 你的
onErrorResume()
调用这个内部发布者本身 returns 一个错误,并且这个内部发布者没有其他错误处理 - 因此为什么那个错误没有被处理,所以它然后打印出堆栈跟踪你看到的。
相反,您很可能希望您的 onMessage()
方法这样读:
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
...没有 Mono.create()
(这实际上只是为了兼容性目的由非反应器回调 API 使用。)您进行此更改后的输出如下所示:
input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error