Spring Reactive MongoDB: 如何捕获运行时异常?

Spring Reactive MongoDB: how to catch runtime exceptions?

我试图在 Spring 引导中拦截 MongoDB DuplicateKeyException 以避免如此夸大的堆栈跟踪日志记录。

这就是我试图捕获异常的方式:

@Scheduled(fixedDelay = Milliseconds.MONTH)
private void synchronizeForbesOrganizations() {
    System.out.println("Forbes Organizations Synchronizer");

    this.forbesOrganizationService.save(this.forbesAPIService.getGlobal2000())
            .take(ForbesAPI.Global2000.SIZE)
            .subscribeOn(Schedulers.parallel())
            .onErrorResume(e -> {
                System.out.println("On Error Resume");
                System.out.println(e.getMessage());

                return Flux.empty();
            })
            .subscribe();
}

但是,即使调用了 onErrorResume 方法,我仍然得到以下所有堆栈跟踪:

2020-08-19 13:07:07.514 ERROR 34762 --- [ntLoopGroup-3-3] reactor.core.publisher.Operators         : Operator called default onErrorDropped

org.springframework.dao.DuplicateKeyException: E11000 duplicate key error collection: radar.forbesOrganization index: OrganizationYear_UK dup key: { organizationName: "China Construction Bank", year: 2020 }; nested exception is com.mongodb.MongoWriteException: E11000 duplicate key error collection: radar.forbesOrganization index: OrganizationYear_UK dup key: { organizationName: "China Construction Bank", year: 2020 }
    at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:99) ~[spring-data-mongodb-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.data.mongodb.core.ReactiveMongoTemplate.potentiallyConvertRuntimeException(ReactiveMongoTemplate.java:2814) ~[spring-data-mongodb-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.data.mongodb.core.ReactiveMongoTemplate.lambda$translateException(ReactiveMongoTemplate.java:2797) ~[spring-data-mongodb-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at reactor.core.publisher.Flux.lambda$onErrorMap(Flux.java:6504) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.onError(AbstractSubscription.java:142) ~[mongodb-driver-reactivestreams-4.0.5.jar:na]
    at com.mongodb.reactivestreams.client.internal.SingleResultCallbackSubscription.lambda$requestInitialData[=13=](SingleResultCallbackSubscription.java:41) ~[mongodb-driver-reactivestreams-4.0.5.jar:na]
    at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeInsertOne[=13=](AsyncMongoCollectionImpl.java:467) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1074) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.client.OperationExecutorImpl.onResult(OperationExecutorImpl.java:138) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:431) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:509) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.operation.MixedBulkWriteOperation.access00(MixedBulkWriteOperation.java:71) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:491) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.operation.MixedBulkWriteOperation.onResult(MixedBulkWriteOperation.java:463) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.onResult(DefaultServer.java:251) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.CommandProtocolImpl.onResult(CommandProtocolImpl.java:84) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.onResult(DefaultConnectionPool.java:517) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.onResult(UsageTrackingInternalConnection.java:111) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:398) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.onResult(InternalStreamConnection.java:375) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:676) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:643) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:513) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:510) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:232) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:510) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.access00(InternalStreamConnection.java:75) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:633) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:618) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:513) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.completed(InternalStreamConnection.java:510) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:232) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream.handleReadResponse(NettyStream.java:262) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream.access0(NettyStream.java:69) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:321) ~[mongodb-driver-core-4.0.5.jar:na]
    at com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:318) ~[mongodb-driver-core-4.0.5.jar:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: com.mongodb.MongoWriteException: E11000 duplicate key error collection: radar.forbesOrganization index: OrganizationYear_UK dup key: { organizationName: "China Construction Bank", year: 2020 }
    at com.mongodb.internal.async.client.AsyncMongoCollectionImpl.lambda$executeSingleWriteRequest(AsyncMongoCollectionImpl.java:1075) ~[mongodb-driver-core-4.0.5.jar:na]
    ... 50 common frames omitted

我怎样才能捕捉到这样的异常,以便只记录一条用户友好的消息而不是整个堆栈跟踪?

可以通过onErrorContinue(...)方法在订阅Flux之前捕获异常:

@Scheduled(fixedDelay = Milliseconds.MONTH)
private void synchronizeForbesOrganizations() {
    this.forbesOrganizationService.save(this.forbesAPIService.getGlobal2000())
            .take(ForbesAPI.Global2000.SIZE)
            .subscribeOn(Schedulers.parallel())
            .onErrorContinue(DuplicateKeyException.class, (e, o) -> {System.out.println(e.getMessage());})
            .subscribe();
}

可以找到更多关于 Reactor 中的错误处理的信息 here